1 支持的数据类型
数据类型 | 描述 | 值域 |
---|---|---|
VARCHAR | 可变长度字符串 | 最大容量为4mb |
BOOLEAN | 逻辑值 | 值:TRUE,FALSE,UNKNOWN |
TINYINT | 微整型 | 1字节整数 范围是-128到127 |
SMALLINT | 短整型,2字节整数 | 范围为-32768至32767 |
INT | 整型,4字节整数 | 范围是-2147483648到2147483647 |
BIGINT | 长整型,8字节整数 | 范围是-9223372036854775808至9223372036854775807 |
FLOAT | 4字节浮点数 | 6位数字精度 |
DECIMAL | 小数类型 | 示例:123.45是DECIMAL(5,2)值。 |
DOUBLE | 浮点,8字节浮点数 | 15位十进制精度 |
DATE | 日期类型 | 示例:DATE'1969-07-20' |
TIME | 时间类型 | 示例:TIME '20:17:40' |
TIMESTAMP | 时间戳,日期和时间 | 示例:TIMESTAMP'1969-07-20 20:17:40' |
VARBINARY | 二进制数据 | 即 byte[] 数组 |
2 DDL语句
2.1 Source Table 定义数据源表
Kafka Source Table
目前只支持kafka
CREATE SOURCE TABLE orders (
userid '/user/userid' varchar,
money bigint
)
WITH (
type = 'kafka', -- 固定值
topic = 'flink-topic-input', -- kafka topic
encode = 'json', -- 支持kafka 消息格式为:json 和 csv 两种格式,json 支持嵌套,但需要在column定义时,指定path,如果上面实例中:/user/userid
delimiter = '|', -- 如果encode为csv,设置分隔符,默认为逗号
kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092', --开发borker server地址
kafka.group.id = 'flink-example-group' -- 消费组名称
)
TIMESTAMP BY proctime proctime;
2.2 Sink Table 定义数据输出表
Kafka Sink Table 案例
CREATE SINK TABLE stat_orders (
window_start TIMESTAMP,
window_end TIMESTAMP,
userid varchar,
total_money bigint
)
WITH (
type = 'kafka', -- 固定值
topic = 'flink-topic-output', -- kafka topic
encode = 'json', -- 支持kafka 消息格式为:json 和 csv 两种格式。
delimiter = '|', -- 如果encode为csv,设置分隔符,默认为逗号
kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092', --开发borker server地址
)
TIMESTAMP BY proctime proctime;
InfluxDb Sink Table案例
CREATE SINK TABLE stat_orders (
_measurement varchar, -- _measurement字段必须要存在
_tag1 varchar, --tag字段可选,tag字段必须以_tag开头,若有多个tag,在_tag后面加数字
_tag2 varchar,
cpu varchar, -- field的名称
io varchar -- field的名称
)
WITH (
type = 'influxdb', -- 固定值
influxdb.url = http://10.57.17.82:8086, --必选参数
influxdb.username = test, --必选参数
influxdb.password = test, --必选参数
influxdb.database = testdb, --必选参数
influxdb.retention.policy = default, --非必选参数
influxdb.batch.actions = 100 --触发批次写入的条数, 非必须参数
influxdb.flush.duration = 100 --触发批次写入的时长(单位: ms), 非必须参数
influxdb.compress.gz = true --开启gz压缩,默认为false,非必须参数
)
TIMESTAMP BY proctime proctime;
Aerospike Sink Table
第一个字段作为主键
CREATE SINK TABLE stat_orders (
userid varchar,
total_money bigint
)
WITH (
type = 'aerospike', -- 固定值
aerospike.zookeep.servers = 'xxx' --使用公司封装的client,通过zk获取asp连接地址
aerospike.code = 'xxx',
aerospike.key.ttl = 3600000 --过期时间
)
TIMESTAMP BY proctime proctime;
3 DML语句
INSERT INTO语句
语法格式
INSERT INTO tableName(,tableName)* queryStatement;
queryStatement 完成语法以及相关函数,请参考flik官方文档
示例:
INSERT INTO LargeOrders
SELECT * FROM Orders WHERE units > 1000;
INSERT INTO Orders(z, v)
SELECT c,d FROM OO;
说明
一个作业支持一个source table, 一个insert sql,支持多个sink table, 目的是一次计算,输出到多个sink table中
流计算不支持单独的select 查询,必须有CREATE VIEW 或这是在 INSERT INTO内才能操作。
INSERT INTO 支持UPDATA更新,例如向TIDB的表插入一个KEY值,如果这个KEY值存在就会更新;如果不存在就会插入一条新的KEY值。
4 SQL example
利用自定义函数对数据进行清洗加工
create function demoFunc as 'cn.tongdun.streamcompute.metrics.UdtfTest' USING streamcompute-udf-1.0-SNAPSHOT.jar;
CREATE SOURCE TABLE orders (
userid varchar,
money bigint
)
WITH (
type = 'kafka',
topic = 'sc-dev-topic-input1',
encode = 'csv',
-- delimiter = '|',
kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092',
kafka.group.id = 'streamcompute-dev-group1'
)
TIMESTAMP BY proctime proctime;
CREATE SINK TABLE stat_orders_kafka (
window_start TIMESTAMP,
window_end TIMESTAMP,
userid varchar,
total_money bigint
)
WITH (
type = 'kafka',
topic = 'sc-dev-topic-output',
encode = 'csv',
kafka.bootstrap.servers = 'dp88:9092,dp253:9092,dp254:9092'
);
insert into stat_orders_kafka
SELECT
TUMBLE_START(proctime, INTERVAL '10' SECOND),
TUMBLE_END(proctime, INTERVAL '10' SECOND),
newuserid,
SUM(money) as total_money
FROM (select userid, newuserid, money, proctime from orders LEFT JOIN LATERAL TABLE(demoFunc(userid)) as T(newuserid) ON TRUE) a
GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND),newuserid;